Audioデータをクラウドに送ってみました。 Amazon Kinesis Data Streams + Amazon Kinesis Data Firehose + S3 (バイナリ)
1 はじめに
CX事業本部の平内(SIN)です。
ここまで、エッジ側のAudioデータをクラウドへ送信する要領をいくつか確認してみました。
上記は、すべて、JSON形式のデータとして送信しましたが、Amazon Kinesis Data StreamsもAmazon Kinesis Data Firehoseもboto3のパラメータは、バイナリ(内部でシリアル化される時、Base64で変換されている)となっています。
そこで、今回は、Audioデータをテキスト化するオーバーヘッドを無くすために、バイナリ形式のまま送信する要領を確認してみました。
2 構成
構成は、前回と同じです。
デバイスからは、500msec単位で、Audioデータ(バイナリ形式)とタイムスタンプ(バイナリ形式)を結合して送信します。
このため、Amazon Kinesis Data Streams、Amazon Kinesis Data Firehose及び、S3バケットでは、バイナリ形式のまま保存されます。
データは、必要に応じて、Lambdaで処理(デコード)されます。
3 バイナリ形式のデータ
500msec分のRAWデータは、8,000byteですが、これにタイムスタンプ(8byte)を足して、1つのデータ単位は8,008byteとしました。
下記は、最終的にS3に保存された状況です。
やや、バラバラなサイズに見えますが、ダウンロードしてサイズを確認すると、8,008の倍数になっていることが分かります。
-rw-r--r--@ 1 976976 4 17 20:55 audio_transmission_firehose-1-2021-04-17-11-41-45-4f667d04-0ac6-457c-b274-54a9ac67375b -rw-r--r--@ 1 840840 4 17 20:55 audio_transmission_firehose-1-2021-04-17-11-42-46-85bcf123-dddd-4d11-98d8-5af1c4b65db9 -rw-r--r--@ 1 968968 4 17 20:53 audio_transmission_firehose-1-2021-04-17-11-43-48-fd291ba6-f423-4651-8c3c-024f26852d3d -rw-r--r--@ 1 976976 4 17 20:55 audio_transmission_firehose-1-2021-04-17-11-44-48-510ca90f-b905-4b99-96ba-a41192285941
976976/8008 = 122 840840/8008 = 105 968968/8008 = 121 976976/8008 = 122
下記のコードは、AudioのRAWデータ(擬似的に10byteのデータ)と、タイムスタンプを結合して、再び、分解しているサンプルです。
pack_unpack.py
import datetime import struct def disp(dt, data): print("{} {} type:{} len:{}".format(dt, data, type(data), len(data))) # 転送前のデータ print("- before -") raw_data = bytes(range(0, 10)) now = datetime.datetime.now() disp(now, raw_data) # 時刻とデータの結合 transfer_data = struct.pack('<d', now.timestamp()) + raw_data print("- transfer -") print("{} type:{} len:{}".format(transfer_data, type(transfer_data), len(transfer_data))) # 転送後のデータ print("- after -") # 時刻とデータの分離 ts = transfer_data[:8] raw_data_2 = transfer_data[8:] now_2 = datetime.datetime.fromtimestamp(struct.unpack('<d', ts)[0]) disp(now_2, raw_data_2)
$ python3 pack_unpack.py - before - 2021-04-17 21:42:35.371176 b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\t' type:<class 'bytes'> len:10 - transfer - b'Y\xc1\xd7\xee\xb5\x1e\xd8A\x00\x01\x02\x03\x04\x05\x06\x07\x08\t' type:<class 'bytes'> len:18 - after - 2021-04-17 21:42:35.371176 b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\t' type:<class 'bytes'> len:10
このコードを使用して、既存のコードをバイナリ対応に変更します。
4 エッジ側のコード
エッジ側で500ms毎に、Amazon Kinesis Data Streamsに送信しているコードです。
JSONの時は、改行でデータ単位を分割しましたが、今回は、1データのサイズが固定なので、セパレータはありません。
index.py
import pyaudio from producer import Producer import numpy as np producer = Producer() DEVICE_INDEX = 0 CHANNELS = 2 SAMPLE_RATE = 32000 # サンプルレート FORMAT = pyaudio.paInt16 CHUNK = int(SAMPLE_RATE/2) # 500msごとに取得する p = pyaudio.PyAudio() stream = p.open(format = FORMAT, channels = CHANNELS, rate = SAMPLE_RATE, input = True, input_device_index = DEVICE_INDEX, frames_per_buffer = CHUNK) try: print("start ...") while True: # 500ms秒分のデータ読み込み data = stream.read(CHUNK) # numpy配列に変換 data = np.frombuffer(data, dtype="int16") # チャンネル 2ch -> 1ch data = data[0::2] # サンプルレート 32000Hz -> 8000Hz data = data[0::4] # byteに戻す data = data.tobytes() # Amazon Kinesis Data Streamsへの送信 producer.send(data) except: stream.stop_stream() stream.close() p.terminate()
producer.py
import json from datetime import datetime import base64 import boto3 from boto3.session import Session import time import random import struct class Producer(): def __init__(self): self.__identity_id = "ap-northeast-1:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" self.__region = "ap-northeast-1" self.__stream_name = 'audio_transmission_data_stream' self.__kinesis = self.__get_kinesis() def __get_kinesis(self): client = boto3.client('cognito-identity', self.__region) resp = client.get_id(IdentityPoolId = self.__identity_id) resp = client.get_credentials_for_identity(IdentityId=resp['IdentityId']) secretKey = resp['Credentials']['SecretKey'] accessKey = resp['Credentials']['AccessKeyId'] token = resp['Credentials']['SessionToken'] session = Session(aws_access_key_id = accessKey, aws_secret_access_key = secretKey, aws_session_token = token, region_name = self.__region) return session.client('kinesis') def send(self, data): now = datetime.now() # 時刻とデータの結合 ts = now.timestamp() ts = struct.pack('<d', ts) transfer_data = ts + data try: response = self.__kinesis.put_record( StreamName = self.__stream_name, PartitionKey = str(random.randrange(0,100)), Data = transfer_data ) except Exception as e: print("Exception: {}", e.args) print('put_record SequenceNumber:{}'.format(response['SequenceNumber']))
5 Lambda(リアルタイムデータ処理)
Amazon Kinesis Data StreamsをトリガーするLambdaのコードです。
レコードからタイムスタンプとRAWデータを分割して、最大値(ボリューム)を取得して、IoT Coreのトピックに送っています。
import json import datetime import os import boto3 import base64 import numpy as np import struct def lambda_handler(event, context): topic = 'topic/level_meter' iot = boto3.client('iot-data') records = event["Records"] for record in records: data = base64.b64decode(record["kinesis"]["data"]) print("{} {}".format(type(data), len(data))) # データを分離 ts = data[:8] raw_data = data[8:] ts = struct.unpack('<d', ts)[0] now = datetime.datetime.fromtimestamp(ts) + datetime.timedelta(hours=9) # 1データを2byteとして扱う raw_data = np.frombuffer(raw_data, dtype="int16") # 最大値を取得する max = int(raw_data.max()) payload = { "timestamp": now.strftime('%Y-%m-%dT%H:%M:%S.%f'), "level": max } print("payload:{}".format(payload)) iot.publish( topic=topic, qos=0, payload=json.dumps(payload) ) return {}
6 Lambda(wavデータ生成)
S3に保存される形式が変わったので、wavを作成するLambdaも変更されています。
import json import datetime import os import boto3 import base64 import wave import struct def lambda_handler(event, context): BUCKET_NAME = os.environ['BUCKET_NAME'] output_filename = event["output_filename"] start_time_str = event["start_time"] period_sec = int(event["period_sec"]) print("start_time:{} period_sec:{}".format(start_time_str, period_sec)) # 開始時間と終了時間(JST) start_time_jst = datetime.datetime.strptime(start_time_str, '%Y-%m-%d %H:%M:%S') end_time_jst = start_time_jst + datetime.timedelta(seconds=period_sec) print("JST {} - {} ".format(start_time_jst, end_time_jst)) # S3上のオブジェクト名を検索するため # 開始時間と終了時間(UTC) ファイル検索用なので、開始時間は、1分前とする start_time_utc = start_time_jst + datetime.timedelta(hours=-9) end_time_utc = start_time_utc + datetime.timedelta(seconds=period_sec) start_time_utc = start_time_utc + datetime.timedelta(minutes=-1) print("UTC {} - {}".format(start_time_utc, end_time_utc)) s3client = boto3.client('s3') if("IsLocal" in os.environ and os.environ["IsLocal"] == "Yes"): session = boto3.Session(profile_name="developer") s3client = session.client('s3') # 当日分のオブジェクト名を列挙する prefix = "{:4d}/{:02d}/{:02d}".format( start_time_utc.year, start_time_utc.month, start_time_utc.day ) response = s3client.list_objects_v2( Bucket = BUCKET_NAME, Prefix = prefix ) # オブジェクト名から、対象期間ヒットするオブジェクトを列挙する target_keys = [] if("Contents" in response): for content in response["Contents"]: # オブジェクト名からtimestampを取得する key = content["Key"] tmp = key.split('/')[4].split('-') year = int(tmp[2]) month = int(tmp[3]) day = int(tmp[4]) hour = int(tmp[5]) minute = int(tmp[6]) second = int(tmp[7]) dt = datetime.datetime(year, month, day, hour, minute, second, microsecond=0) if(start_time_utc <= dt and dt <= end_time_utc): target_keys.append(key) target_keys.sort() # オブジェクトをダウンロードして、timestampが取得期間ヒットするRAWデータを取得する frames = [] for key in target_keys: body = s3client.get_object(Bucket=BUCKET_NAME, Key=key)['Body'].read() # 8008byte単位で処理する data_size = 8008 start = 0 for _ in range(int(len(body)/data_size)): data = body[start:start+data_size] raw_data = data[8:] ts = data[:8] ts = struct.unpack('<d', ts)[0] now = datetime.datetime.fromtimestamp(ts) time_str = now.strftime('%Y-%m-%dT%H:%M:%S.%f') timestamp = datetime.datetime.strptime(time_str, '%Y-%m-%dT%H:%M:%S.%f') # 取得期間のデータは、framesに追加する if(start_time_jst <= timestamp and timestamp <= end_time_jst): frames.append(raw_data) start += data_size data = b''.join(frames) # RAWデータをwavファイルとして保存する CHANNELS = 1 # 1ch SAMPLE_RATE = 8000 # 8kHz SAMPLE_WIDTH = 2 tmp_file_name = "/tmp/tmp.wav" wf = wave.open(tmp_file_name, 'wb') wf.setnchannels(CHANNELS) wf.setsampwidth(SAMPLE_WIDTH) wf.setframerate(SAMPLE_RATE) wf.writeframes(data) wf.close() s3client.upload_file(tmp_file_name, BUCKET_NAME, output_filename) return {}
7 最後に
今回は、データをバイナリで扱う要領を確認してみました。
オーバーヘッドとデータサイズは、確実に減っていると思いますが、途中でデータをトリガーするには、やはり、少し面倒になります。
要件によって検討ということでしょうか・・・
全てのコードは下記に置きました
https://github.com/furuya02/AudioTransmission/tree/main/sample_4